package cn.lsoft.undoner.service.impl;

import cn.lsoft.undoner.model.RabbitQueue;
import cn.lsoft.undoner.service.RabbitMQService;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author <A HREF="mailto:undoner@gmail.com">undoner</A>
 * @date 03-11-2014
 */
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    private static final Logger LOGGER = Logger.getLogger(RabbitMQServiceImpl.class);

    private static final String JOB_TOOL_EXCHANGE = "mq_message_exchange";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private DefaultHttpClient httpClient;
    @Value("${rabbit.http.api.queue}")
    private String rabbitApiUrl;
    @Value("${rabbit.http.api.auth:Basic Z3Vlc3Q6Z3Vlc3Q=}")
    private String rabbitAuth;

    @PostConstruct
    public void init() {
        httpClient = new DefaultHttpClient();
        httpClient.setHttpRequestRetryHandler(null);
//        httpClient.getParams().setIntParameter(CoreConnectionPNames.SO_TIMEOUT,
//                soTimeout);
//        httpClient.getParams().setIntParameter(
//                CoreConnectionPNames.CONNECTION_TIMEOUT, connectionTimeout);
//        httpClient.getParams().setParameter(ClientPNames.HANDLE_REDIRECTS,
//                handler_redirects);
//        DeadQueueDBPC.INSTANCE.setRunningCheck(new DeadQueueDBPC.RunningCheck() {
//            @Override
//            public boolean isNeedSend() {
//                List<RabbitQueue> rabbitQueueList = listMessageReadyDeadQueue();
//                return rabbitQueueList != null && rabbitQueueList.size() == 0;
//            }
//        });
//        DeadQueueDBPC.INSTANCE.start();
    }

    @Override
    public int reSend(final String fromQueue, final String toQueue, Integer limit) {
        Message message = null;
        int successNum = 0;
        if (limit == null) {
            limit = 0;
        }
        final int max = limit;


        successNum = rabbitTemplate.execute(new ChannelCallback<Integer>() {
            @Override
            public Integer doInRabbit(Channel channel) throws Exception {
                int successNum = 0;
                String content = null;
                GetResponse getResponse = null;

                try {
                    channel.exchangeDeclare(JOB_TOOL_EXCHANGE, "direct", true);
                    channel.queueBind(fromQueue, JOB_TOOL_EXCHANGE, fromQueue);
                    channel.queueBind(toQueue, JOB_TOOL_EXCHANGE, toQueue);
                } catch (IOException e) {
                    LOGGER.error("queue not found", e);
                    return 0;
                }

                while ((getResponse = channel.basicGet(fromQueue, false)) != null) {
                    try {
                        channel.basicPublish(JOB_TOOL_EXCHANGE, toQueue, null, getResponse.getBody());
                        LOGGER.debug("reSend Message:" + new String(getResponse.getBody()));
                        channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);
                        successNum++;
                        if (max != 0 && max == successNum) {
                            break;
                        }
                    } catch (IOException e) {
                        channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), true);
                    }
                }
                return successNum;
            }
        });
        return successNum;
    }


    @Override
    public List<RabbitQueue> listMessageReadyDeadQueue() {
        HttpGet httpGet = new HttpGet(rabbitApiUrl);
        httpGet.setHeader("Authorization", rabbitAuth);
        try {
            HttpResponse httpResponse = httpClient.execute(httpGet);
            String res = EntityUtils.toString(httpResponse.getEntity());
            List<RabbitQueue> rabbitQueueList = new Gson().fromJson(res, new TypeToken<List<RabbitQueue>>() {
            }.getType());
            List<RabbitQueue> resList = new ArrayList<RabbitQueue>();
            for (RabbitQueue rabbitQueue : rabbitQueueList) {
                if (rabbitQueue.getMessages_ready() != 0 && rabbitQueue.getName().endsWith(".dead")) {
                    resList.add(rabbitQueue);
                }
            }
            return resList;
        } catch (Exception e) {
            LOGGER.warn("http client fail", e);
            return null;
        }
    }

    @Override
    public List<RabbitQueue> listMessageReadyAllQueue() {
        HttpGet httpGet = new HttpGet(rabbitApiUrl);
        httpGet.setHeader("Authorization", rabbitAuth);
        try {
            HttpResponse httpResponse = httpClient.execute(httpGet);
            String res = EntityUtils.toString(httpResponse.getEntity());
            List<RabbitQueue> rabbitQueueList = new Gson().fromJson(res, new TypeToken<List<RabbitQueue>>() {
            }.getType());
            List<RabbitQueue> resList = new ArrayList<RabbitQueue>();
            for (RabbitQueue rabbitQueue : rabbitQueueList) {
                if (rabbitQueue.getMessages_ready() != 0) {
                    resList.add(rabbitQueue);
                }
            }
            return resList;
        } catch (Exception e) {
            LOGGER.warn("http client fail", e);
            return null;
        }
    }

    @Override
    public List<RabbitQueue> listMessageReadyQueue() {
        HttpGet httpGet = new HttpGet(rabbitApiUrl);
        httpGet.setHeader("Authorization", rabbitAuth);
        try {
            HttpResponse httpResponse = httpClient.execute(httpGet);
            String res = EntityUtils.toString(httpResponse.getEntity());
            List<RabbitQueue> rabbitQueueList = new Gson().fromJson(res, new TypeToken<List<RabbitQueue>>() {
            }.getType());
            List<RabbitQueue> resList = new ArrayList<RabbitQueue>();
            for (RabbitQueue rabbitQueue : rabbitQueueList) {
                if (rabbitQueue.getMessages_ready() != 0 && !rabbitQueue.getName().endsWith(".dead")) {
                    resList.add(rabbitQueue);
                }
            }
            return resList;
        } catch (Exception e) {
            LOGGER.warn("http client fail", e);
            return null;
        }
    }
}
